package com.zhengbo.simplerpc.server;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zhengbo.simplerpc.common.IMessageHandler;
import com.zhengbo.simplerpc.common.MessageHandlers;
import com.zhengbo.simplerpc.common.MessageInput;
import com.zhengbo.simplerpc.common.MessageRegistry;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * Created by zhengbo on 2019/8/21.
 */
@Slf4j
@ChannelHandler.Sharable
public class ServerMessageCollector extends ChannelInboundHandlerAdapter {


    private ThreadPoolExecutor executor;

    private MessageHandlers handlers;

    private MessageRegistry registry;

    public ServerMessageCollector(MessageHandlers handlers, MessageRegistry registry, int workThreads) {

        BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>(1000);

        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("rpc-%s").build();

        this.executor = new ThreadPoolExecutor(1, workThreads, 1, TimeUnit.MINUTES, queue,
                threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
        this.handlers = handlers;
        this.registry = registry;
    }

    public void closeGraceFully() {

        this.executor.shutdown();

        try {
            this.executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("{}", e);
        }

        this.executor.shutdownNow();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("message collector channel active...");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        log.info("message collector channel in active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if (msg instanceof MessageInput) {
            this.executor.execute(() -> {
                this.handlerMessage(ctx, (MessageInput) msg);
            });
        }
    }

    private void handlerMessage(ChannelHandlerContext ctx, MessageInput input) {

        Class<?> clazz = registry.get(input.getType());

        if (clazz == null) {
            handlers.defaultHandler().handler(ctx, input.getRequestId(), input);
            return;
        }

        Object payLoad = input.getPayLoad(clazz);
        IMessageHandler<Object> realHandler = (IMessageHandler<Object>) handlers.get(input.getType());

        if (realHandler != null) {
            realHandler.handler(ctx, input.getRequestId(), payLoad);
        } else {
            handlers.defaultHandler().handler(ctx, input.getRequestId(), input);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warn("connect error", cause);
    }
}
